// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/thanos-io/thanos/blob/2be2db77/pkg/compact/compact.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Thanos Authors.

package compactor

import (
	"context"
	"errors"
	"fmt"
	"os"
	"path"
	"path/filepath"
	"strconv"
	"strings"
	"sync"
	"time"

	"github.com/go-kit/log"
	"github.com/go-kit/log/level"
	"github.com/grafana/dskit/cancellation"
	"github.com/grafana/dskit/concurrency"
	"github.com/grafana/dskit/multierror"
	"github.com/grafana/dskit/runutil"
	"github.com/oklog/ulid/v2"
	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/promauto"
	"github.com/prometheus/prometheus/model/labels"
	"github.com/prometheus/prometheus/tsdb"
	"github.com/thanos-io/objstore"
	"github.com/thanos-io/objstore/providers/filesystem"
	"go.uber.org/atomic"

	"github.com/grafana/mimir/pkg/storage/indexheader"
	"github.com/grafana/mimir/pkg/storage/sharding"
	"github.com/grafana/mimir/pkg/storage/tsdb/block"
)

var errCompactionIterationCancelled = cancellation.NewErrorf("compaction iteration cancelled")
var errCompactionIterationStopped = cancellation.NewErrorf("compaction iteration stopped")

type deduplicateFilter interface {
	block.MetadataFilter

	// DuplicateIDs returns IDs of duplicate blocks generated by the last call to the Filter method.
	DuplicateIDs() []ulid.ULID
}

// metaSyncer synchronizes block metas from a bucket into a local directory.
// It sorts them into compaction groups based on equal label sets.
type metaSyncer struct {
	logger                  log.Logger
	bkt                     objstore.Bucket
	fetcher                 *block.MetaFetcher
	mtx                     sync.Mutex
	blocks                  map[ulid.ULID]*block.Meta
	metrics                 *syncerMetrics
	deduplicateBlocksFilter deduplicateFilter
}

type syncerMetrics struct {
	garbageCollections        prometheus.Counter
	garbageCollectionFailures prometheus.Counter
	garbageCollectionDuration prometheus.Histogram
	blocksMarkedForDeletion   prometheus.Counter
}

func newSyncerMetrics(reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter) *syncerMetrics {
	var m syncerMetrics

	m.garbageCollections = promauto.With(reg).NewCounter(prometheus.CounterOpts{
		Name: "thanos_compact_garbage_collection_total",
		Help: "Total number of garbage collection operations.",
	})
	m.garbageCollectionFailures = promauto.With(reg).NewCounter(prometheus.CounterOpts{
		Name: "thanos_compact_garbage_collection_failures_total",
		Help: "Total number of failed garbage collection operations.",
	})
	m.garbageCollectionDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
		Name:    "thanos_compact_garbage_collection_duration_seconds",
		Help:    "Time it took to perform garbage collection iteration.",
		Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720},
	})

	m.blocksMarkedForDeletion = blocksMarkedForDeletion

	return &m
}

// newMetaSyncer returns a new metaSyncer for the given Bucket and directory.
// Blocks must be at least as old as the sync delay to be considered.
func newMetaSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher *block.MetaFetcher, deduplicateBlocksFilter deduplicateFilter, blocksMarkedForDeletion prometheus.Counter) (*metaSyncer, error) {
	if logger == nil {
		logger = log.NewNopLogger()
	}
	return &metaSyncer{
		logger:                  logger,
		bkt:                     bkt,
		fetcher:                 fetcher,
		blocks:                  map[ulid.ULID]*block.Meta{},
		metrics:                 newSyncerMetrics(reg, blocksMarkedForDeletion),
		deduplicateBlocksFilter: deduplicateBlocksFilter,
	}, nil
}

// SyncMetas synchronizes the local state of block metas with what we have in the bucket.
func (s *metaSyncer) SyncMetas(ctx context.Context) error {
	s.mtx.Lock()
	defer s.mtx.Unlock()

	// While fetching blocks, we filter out blocks marked for deletion.
	// No deletion delay is used -- all blocks with deletion marker are ignored, and not considered for compaction.
	metas, _, err := s.fetcher.FetchWithoutMarkedForDeletion(ctx)
	if err != nil {
		return err
	}
	s.blocks = metas
	return nil
}

// Metas returns loaded metadata blocks since last sync.
func (s *metaSyncer) Metas() map[ulid.ULID]*block.Meta {
	s.mtx.Lock()
	defer s.mtx.Unlock()

	return s.blocks
}

// GarbageCollect marks blocks for deletion from bucket if their data is available as part of a
// block with a higher compaction level.
// A call to SyncMetas function is required to populate duplicateIDs in duplicateBlocksFilter.
func (s *metaSyncer) GarbageCollect(ctx context.Context) error {
	s.mtx.Lock()
	defer s.mtx.Unlock()

	begin := time.Now()

	// The deduplication filter is applied after all blocks marked for deletion have been excluded
	// (with no deletion delay), so we expect that all duplicated blocks have not been marked for
	// deletion yet. Even in the remote case these blocks have already been marked for deletion,
	// the block.MarkForDeletion() call will correctly handle it.
	duplicateIDs := s.deduplicateBlocksFilter.DuplicateIDs()

	for _, id := range duplicateIDs {
		if ctx.Err() != nil {
			return ctx.Err()
		}

		// Spawn a new context so we always mark a block for deletion in full on shutdown.
		delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)

		level.Info(s.logger).Log("msg", "marking outdated block for deletion", "block", id)
		err := block.MarkForDeletion(delCtx, s.logger, s.bkt, id, "outdated block", s.metrics.blocksMarkedForDeletion)
		cancel()
		if err != nil {
			s.metrics.garbageCollectionFailures.Inc()
			return fmt.Errorf("mark block %s for deletion: %w", id, err)
		}

		// Immediately update our in-memory state so no further call to SyncMetas is needed
		// after running garbage collection.
		delete(s.blocks, id)
	}
	s.metrics.garbageCollections.Inc()
	s.metrics.garbageCollectionDuration.Observe(time.Since(begin).Seconds())
	return nil
}

// Grouper is responsible for grouping all known blocks into concurrency safe compaction Jobs.
type Grouper interface {
	// Groups returns the compaction jobs for all blocks currently known to the syncer.
	// It creates all jobs from scratch on every call.
	Groups(blocks map[ulid.ULID]*block.Meta) (res []*Job, err error)
}

// defaultGroupKey returns a unique identifier for the group the block belongs to, based on
// the DefaultGrouper logic. It considers the downsampling resolution and the block's labels.
func defaultGroupKey(res int64, lbls labels.Labels) string {
	return fmt.Sprintf("%d@%v", res, labels.StableHash(lbls))
}

func minTime(metas []*block.Meta) time.Time {
	if len(metas) == 0 {
		return time.Time{}
	}

	minT := metas[0].MinTime
	for _, meta := range metas {
		if meta.MinTime < minT {
			minT = meta.MinTime
		}
	}

	return time.Unix(0, minT*int64(time.Millisecond)).UTC()
}

func maxTime(metas []*block.Meta) time.Time {
	if len(metas) == 0 {
		return time.Time{}
	}

	maxT := metas[0].MaxTime
	for _, meta := range metas {
		if meta.MaxTime > maxT {
			maxT = meta.MaxTime
		}
	}

	return time.Unix(0, maxT*int64(time.Millisecond)).UTC()
}

// Planner returns blocks to compact.
type Planner interface {
	// Plan returns a list of blocks that should be compacted into single one.
	// The blocks can be overlapping. The provided metadata has to be ordered by minTime.
	Plan(ctx context.Context, metasByMinTime []*block.Meta) ([]*block.Meta, error)
}

// Compactor provides compaction against an underlying storage of time series data.
// This is similar to tsdb.Compactor just without the Plan method.
// TODO(bwplotka): Split the Planner from Compactor on upstream as well, so we can import it.
type Compactor interface {
	// Write persists one or more Blocks into a directory.
	// No Block is written when resulting Block has 0 samples and returns an empty slice.
	// Prometheus always return one or no block. The interface allows returning more than one
	// block for downstream users to experiment with compactor.
	Write(dest string, b tsdb.BlockReader, mint, maxt int64, parent *tsdb.BlockMeta) ([]ulid.ULID, error)

	// Compact runs compaction against the provided directories. Must
	// only be called concurrently with results of Plan().
	// Can optionally pass a list of already open blocks,
	// to avoid having to reopen them.
	// Prometheus always return one or no block. The interface allows returning more than one
	// block for downstream users to experiment with compactor.
	// When one resulting Block has 0 samples
	//  * No block is written.
	//  * The source dirs are marked Deletable.
	//  * Block is not included in the result.
	Compact(dest string, dirs []string, open []*tsdb.Block) ([]ulid.ULID, error)

	// CompactWithSplitting merges and splits the source blocks into shardCount number of compacted blocks,
	// and returns a slice of block IDs. The position of the returned block ID in the result slice corresponds to the shard index.
	// If the given compacted block has no series, the corresponding block ID will be the zero ULID value.
	CompactWithSplitting(dest string, dirs []string, open []*tsdb.Block, shardCount uint64) (result []ulid.ULID, _ error)
}

// runCompactionJob plans and runs a single compaction against the provided job. The compacted result
// is uploaded into the bucket the blocks were retrieved from.
func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shouldRerun bool, compIDs []ulid.ULID, rerr error) {
	jobBeginTime := time.Now()

	jobType := "merge"
	if job.UseSplitting() {
		jobType = "split"
	}

	jobLogger := log.With(c.logger, "groupKey", job.Key(), "job_type", jobType)
	subDir := filepath.Join(c.compactDir, job.Key())

	blockCount := len(job.metasByMinTime)

	defer func() {
		elapsed := time.Since(jobBeginTime)

		if rerr == nil {
			c.metrics.compactionJobDuration.WithLabelValues(jobType).Observe(elapsed.Seconds())
			c.metrics.compactionJobBlocks.WithLabelValues(jobType).Observe(float64(blockCount))

			level.Info(jobLogger).Log("msg", "compaction job succeeded", "duration", elapsed, "duration_ms", elapsed.Milliseconds(), "block_count", blockCount)
		} else {
			level.Error(jobLogger).Log("msg", "compaction job failed", "duration", elapsed, "duration_ms", elapsed.Milliseconds(), "err", rerr, "block_count", blockCount)
		}

		if err := os.RemoveAll(subDir); err != nil {
			level.Error(jobLogger).Log("msg", "failed to remove compaction group work directory", "path", subDir, "err", err)
		}
	}()

	if err := os.MkdirAll(subDir, 0750); err != nil {
		return false, nil, fmt.Errorf("create compaction job dir: %w", err)
	}

	toCompact, err := c.planner.Plan(ctx, job.metasByMinTime)
	if err != nil {
		return false, nil, fmt.Errorf("plan compaction: %w", err)
	}
	if len(toCompact) == 0 {
		// Nothing to do.
		return false, nil, nil
	}

	// The planner returned some blocks to compact, so we can enrich the logger
	// with the min/max time between all blocks to compact.
	toCompactMinTime := minTime(toCompact)
	toCompactMaxTime := maxTime(toCompact)
	jobLogger = log.With(jobLogger, "minTime", toCompactMinTime.String(), "maxTime", toCompactMaxTime.String())

	var sb strings.Builder
	for i, meta := range toCompact {
		if i > 0 {
			sb.WriteRune(',')
		}
		sb.WriteString(meta.ULID.String())
	}
	toCompactStr := sb.String()

	level.Info(jobLogger).Log("msg", "compaction available and planned; downloading blocks", "block_count", blockCount, "blocks", toCompactStr)

	// Once we have a plan we need to download the actual data.
	downloadBegin := time.Now()

	err = concurrency.ForEachJob(ctx, len(toCompact), c.blockSyncConcurrency, func(ctx context.Context, idx int) error {
		meta := toCompact[idx]

		// Must be the same as in blocksToCompactDirs.
		bdir := filepath.Join(subDir, meta.ULID.String())

		if err := block.Download(ctx, jobLogger, c.bkt, meta.ULID, bdir); err != nil {
			return fmt.Errorf("download block %s: %w", meta.ULID, err)
		}

		// Ensure all source blocks are valid.
		stats, err := block.GatherBlockHealthStats(ctx, jobLogger, bdir, meta.MinTime, meta.MaxTime, false)
		if err != nil {
			return fmt.Errorf("gather index issues for block %s: %w", bdir, err)
		}

		if err := stats.CriticalErr(); err != nil {
			return criticalError(fmt.Errorf("block with unhealthy index found %s; Compaction level %v; Labels: %v: %w", bdir, meta.Compaction.Level, meta.Thanos.Labels, err), meta.ULID)
		}

		if err := stats.OutOfOrderChunksErr(); err != nil {
			return outOfOrderChunkError(fmt.Errorf("blocks with out-of-order chunks are dropped from compaction:  %s: %w", bdir, err), meta.ULID)
		}

		if err := stats.Issue347OutsideChunksErr(); err != nil {
			return issue347Error{
				err: fmt.Errorf("invalid, but repairable block %s: %w", bdir, err),
				id:  meta.ULID,
			}
		}

		if err := stats.OutOfOrderLabelsErr(); err != nil {
			return fmt.Errorf("block id %s: %w", meta.ULID, err)
		}
		return nil
	})
	if err != nil {
		return false, nil, err
	}

	blocksToCompactDirs := make([]string, len(toCompact))
	var totalBlockSize int64
	for ix, meta := range toCompact {
		blocksToCompactDirs[ix] = filepath.Join(subDir, meta.ULID.String())
		totalBlockSize += meta.BlockBytes()
	}

	elapsed := time.Since(downloadBegin)
	level.Info(jobLogger).Log("msg", "downloaded and verified blocks; compacting blocks", "block_count", blockCount, "blocks", toCompactStr, "total_size_bytes", totalBlockSize, "duration", elapsed, "duration_ms", elapsed.Milliseconds())

	compactionBegin := time.Now()

	if job.UseSplitting() {
		compIDs, err = c.comp.CompactWithSplitting(subDir, blocksToCompactDirs, nil, uint64(job.SplittingShards()))
	} else {
		compIDs, err = c.comp.Compact(subDir, blocksToCompactDirs, nil)
	}
	if err != nil {
		return false, nil, fmt.Errorf("compact blocks %s: %w", toCompactStr, err)
	}

	if !hasNonZeroULIDs(compIDs) {
		// Prometheus compactor found that the compacted block would have no samples.
		level.Info(jobLogger).Log("msg", "compacted block would have no samples, deleting source blocks", "block_count", blockCount, "blocks", toCompactStr)
		for _, meta := range toCompact {
			if meta.Stats.NumSamples == 0 {
				if err := deleteBlock(c.bkt, meta.ULID, filepath.Join(subDir, meta.ULID.String()), jobLogger, c.metrics.blocksMarkedForDeletion); err != nil {
					level.Warn(jobLogger).Log("msg", "failed to mark for deletion an empty block found during compaction", "block", meta.ULID, "err", err)
				}
			}
		}
		// Even though this block was empty, there may be more work to do.
		return true, nil, nil
	}

	elapsed = time.Since(compactionBegin)
	level.Info(jobLogger).Log("msg", "compacted blocks", "new_block_count", len(compIDs), "new_blocks", fmt.Sprintf("%v", compIDs), "block_count", blockCount, "blocks", toCompactStr, "duration", elapsed, "duration_ms", elapsed.Milliseconds())

	uploadBegin := time.Now()

	if err = verifyCompactedBlocksTimeRanges(compIDs, toCompactMinTime.UnixMilli(), toCompactMaxTime.UnixMilli(), subDir); err != nil {
		level.Warn(jobLogger).Log("msg", "compacted blocks verification failed", "err", err)
		c.metrics.compactionBlocksVerificationFailed.Inc()
	}

	blocksToUpload := convertCompactionResultToForEachJobs(compIDs, job.UseSplitting(), jobLogger)
	uploadBlocksCount := len(blocksToUpload)

	// update labels and verify all blocks
	err = concurrency.ForEachJob(ctx, len(blocksToUpload), c.blockSyncConcurrency, func(ctx context.Context, idx int) error {
		blockToUpload := blocksToUpload[idx]
		bdir := filepath.Join(subDir, blockToUpload.ulid.String())

		// When splitting is enabled, we need to inject the shard ID as an external label.
		newLabels := job.Labels().Map()
		if job.UseSplitting() {
			newLabels[block.CompactorShardIDExternalLabel] = sharding.FormatShardIDLabelValue(uint64(blockToUpload.shardIndex), uint64(job.SplittingShards()))
		}
		blocksToUpload[idx].labels = newLabels

		newMeta, err := block.InjectThanosMeta(jobLogger, bdir, block.ThanosMeta{
			Labels:       newLabels,
			Downsample:   block.ThanosDownsample{Resolution: job.Resolution()},
			Source:       block.CompactorSource,
			SegmentFiles: block.GetSegmentFiles(bdir),
		}, nil)

		if err != nil {
			return fmt.Errorf("failed to finalize the block %s: %w", bdir, err)
		}

		if err = os.Remove(filepath.Join(bdir, "tombstones")); err != nil {
			return fmt.Errorf("remove tombstones: %w", err)
		}

		if err := block.VerifyBlock(ctx, jobLogger, bdir, newMeta.MinTime, newMeta.MaxTime, false); err != nil {
			return fmt.Errorf("invalid result block %s: %w", bdir, err)
		}
		return nil
	})
	if err != nil {
		return false, nil, err
	}

	// Optionally build sparse-index-headers. Building sparse-index-headers is best effort, we do not skip uploading a
	// compacted block if there's an error affecting sparse-index-headers.
	switch c.uploadSparseIndexHeaders {
	case true:
		// Create a bucket backed by the local compaction directory, allows calls to prepareSparseIndexHeader to
		// construct sparse-index-headers without making requests to object storage.
		fsbkt, err := filesystem.NewBucket(subDir)
		if err != nil {
			c.metrics.compactionBlocksBuildSparseHeadersFailed.Add(float64(uploadBlocksCount))
			level.Warn(jobLogger).Log("msg", "failed to create filesystem bucket, skipping sparse header upload", "err", err)
			break
		}

		// instrument filesystem.Bucket to objstore.InstrumentedBucket
		fsInstrBkt := objstore.WithNoopInstr(fsbkt)
		_ = concurrency.ForEachJob(ctx, uploadBlocksCount, c.blockSyncConcurrency, func(ctx context.Context, idx int) error {
			blockToUpload := blocksToUpload[idx]
			err := prepareSparseIndexHeader(ctx, jobLogger, fsInstrBkt, subDir, blockToUpload.ulid, c.sparseIndexHeaderSamplingRate, c.sparseIndexHeaderconfig)
			if err != nil {
				c.metrics.compactionBlocksBuildSparseHeadersFailed.Inc()
				level.Warn(jobLogger).Log("msg", "failed to create sparse index headers", "block", blockToUpload.ulid.String(), "shard", blockToUpload.shardIndex, "err", err)
			}
			return nil
		})
	}

	// upload all blocks
	c.metrics.blockUploadsStarted.Add(float64(uploadBlocksCount))
	var totalUploadedSize atomic.Int64
	err = concurrency.ForEachJob(ctx, uploadBlocksCount, c.blockSyncConcurrency, func(ctx context.Context, idx int) error {
		blockToUpload := blocksToUpload[idx]
		bdir := filepath.Join(subDir, blockToUpload.ulid.String())
		begin := time.Now()

		opts := []objstore.UploadOption{
			objstore.WithUploadConcurrency(c.maxPerBlockUploadConcurrency),
		}

		uploadedMeta, err := block.Upload(ctx, jobLogger, c.bkt, bdir, nil, opts...)
		if err != nil {
			var uploadErr *block.UploadError
			if errors.As(err, &uploadErr) {
				c.metrics.blockUploadsFailed.WithLabelValues(uploadErr.FileType()).Inc()
			} else {
				c.metrics.blockUploadsFailed.WithLabelValues(string(block.FileTypeUnknown)).Inc()
			}
			return fmt.Errorf("upload of %s failed: %w", blockToUpload.ulid, err)
		}

		elapsed := time.Since(begin)
		c.metrics.blockUploadsDuration.WithLabelValues(jobType).Observe(elapsed.Seconds())

		blockSize := int64(0)
		seriesCount := uint64(0)
		sampleCount := uint64(0)
		compactionLevel := 0

		if uploadedMeta != nil {
			seriesCount = uploadedMeta.Stats.NumSeries
			sampleCount = uploadedMeta.Stats.NumSamples
			compactionLevel = uploadedMeta.Compaction.Level
			blockSize = uploadedMeta.BlockBytes()
			totalUploadedSize.Add(blockSize)
		}

		level.Info(jobLogger).Log(
			"msg", "uploaded block",
			"result_block", blockToUpload.ulid,
			"size_bytes", blockSize,
			"series_count", seriesCount,
			"sample_count", sampleCount,
			"compaction_level", compactionLevel,
			"duration", elapsed,
			"duration_ms", elapsed.Milliseconds(),
			"external_labels", labels.FromMap(blockToUpload.labels))
		return nil
	})
	if err != nil {
		return false, nil, err
	}

	elapsed = time.Since(uploadBegin)
	level.Info(jobLogger).Log("msg", "uploaded all blocks", "blocks", uploadBlocksCount, "total_size_bytes", totalUploadedSize.Load(), "duration", elapsed, "duration_ms", elapsed.Milliseconds())

	// Mark for deletion the blocks we just compacted from the job and bucket so they do not get included
	// into the next planning cycle.
	// Eventually the block we just uploaded should get synced into the job again (including sync-delay).
	for _, meta := range toCompact {
		attrs, err := block.GetMetaAttributes(ctx, meta, c.bkt)
		if err != nil {
			level.Warn(jobLogger).Log("msg", "failed to determine block upload time", "block", meta.ULID.String(), "err", err)
		} else {
			c.metrics.blockCompactionDelay.WithLabelValues(strconv.Itoa(meta.Compaction.Level)).Observe(compactionBegin.Sub(attrs.LastModified).Seconds())
		}

		if err := deleteBlock(c.bkt, meta.ULID, filepath.Join(subDir, meta.ULID.String()), jobLogger, c.metrics.blocksMarkedForDeletion); err != nil {
			return false, nil, fmt.Errorf("mark old block for deletion from bucket: %w", err)
		}
	}
	return true, compIDs, nil
}

func prepareSparseIndexHeader(ctx context.Context, logger log.Logger, bkt objstore.InstrumentedBucketReader, dir string, id ulid.ULID, sampling int, cfg indexheader.Config) error {
	// Calling NewStreamBinaryReader reads a block's index and writes a sparse-index-header to disk.
	mets := indexheader.NewStreamBinaryReaderMetrics(nil)
	br, err := indexheader.NewStreamBinaryReader(ctx, logger, bkt, dir, id, sampling, mets, cfg)
	if err != nil {
		return err
	}
	return br.Close()
}

// verifyCompactedBlocksTimeRanges does a full run over the compacted blocks
// and verifies that they satisfy the min/maxTime from the source blocks
func verifyCompactedBlocksTimeRanges(compIDs []ulid.ULID, sourceBlocksMinTime, sourceBlocksMaxTime int64, subDir string) error {
	sourceBlocksMinTimeFound := false
	sourceBlocksMaxTimeFound := false

	for _, compID := range compIDs {
		// Skip empty block
		if compID == (ulid.ULID{}) {
			continue
		}

		bdir := filepath.Join(subDir, compID.String())
		meta, err := block.ReadMetaFromDir(bdir)
		if err != nil {
			return fmt.Errorf("failed to read meta.json from %s during block time range verification: %w", bdir, err)
		}

		// Ensure compacted block min/maxTime within source blocks min/maxTime
		if meta.MinTime < sourceBlocksMinTime {
			return fmt.Errorf("invalid minTime for block %s, compacted block minTime %d is before source minTime %d", compID.String(), meta.MinTime, sourceBlocksMinTime)
		}

		if meta.MaxTime > sourceBlocksMaxTime {
			return fmt.Errorf("invalid maxTime for block %s, compacted block maxTime %d is after source maxTime %d", compID.String(), meta.MaxTime, sourceBlocksMaxTime)
		}

		if meta.MinTime == sourceBlocksMinTime {
			sourceBlocksMinTimeFound = true
		}

		if meta.MaxTime == sourceBlocksMaxTime {
			sourceBlocksMaxTimeFound = true
		}
	}

	// Check that the minTime and maxTime from the source blocks
	// are found at least once in the compacted blocks
	if !sourceBlocksMinTimeFound || !sourceBlocksMaxTimeFound {
		return fmt.Errorf("compacted block(s) do not contain minTime %d and maxTime %d from the source blocks", sourceBlocksMinTime, sourceBlocksMaxTime)
	}

	return nil
}

// convertCompactionResultToForEachJobs filters out empty ULIDs.
// When handling result of split compactions, shard index is index in the slice returned by compaction.
func convertCompactionResultToForEachJobs(compactedBlocks []ulid.ULID, splitJob bool, jobLogger log.Logger) []ulidWithShardIndex {
	result := make([]ulidWithShardIndex, 0, len(compactedBlocks))

	for ix, id := range compactedBlocks {
		// Skip if it's an empty block.
		if id == (ulid.ULID{}) {
			if splitJob {
				level.Info(jobLogger).Log("msg", "compaction produced an empty block", "shard_id", sharding.FormatShardIDLabelValue(uint64(ix), uint64(len(compactedBlocks))))
			} else {
				level.Info(jobLogger).Log("msg", "compaction produced an empty block")
			}
			continue
		}

		result = append(result, ulidWithShardIndex{shardIndex: ix, ulid: id})
	}
	return result
}

type ulidWithShardIndex struct {
	ulid       ulid.ULID
	shardIndex int
	labels     map[string]string
}

// issue347Error is a type wrapper for errors that should invoke the repair process for broken block.
type issue347Error struct {
	err error
	id  ulid.ULID
}

func (e issue347Error) Error() string {
	return fmt.Sprintf("%s (block: %s)", e.err.Error(), e.id.String())
}

// isIssue347Error returns true if the base error is an issue347Error.
func isIssue347Error(err error) (bool, issue347Error) {
	var ie issue347Error
	ok := errors.As(err, &ie)
	return ok, ie
}

// OutOfOrderChunksError is a type wrapper for OOO chunk error from validating block index.
type OutOfOrderChunksError struct {
	err error
	id  ulid.ULID
}

func (e OutOfOrderChunksError) Error() string {
	return fmt.Sprintf("%s (block: %s)", e.err.Error(), e.id.String())
}

func outOfOrderChunkError(err error, brokenBlock ulid.ULID) OutOfOrderChunksError {
	return OutOfOrderChunksError{err: err, id: brokenBlock}
}

// IsOutOfOrderChunkError returns true if the base error is a OutOfOrderChunksError.
func IsOutOfOrderChunkError(err error) (bool, OutOfOrderChunksError) {
	var outOfOrderChunksErr OutOfOrderChunksError
	ok := errors.As(err, &outOfOrderChunksErr)
	return ok, outOfOrderChunksErr
}

// CriticalError is a type wrapper for block health critical errors.
type CriticalError struct {
	err error
	id  ulid.ULID
}

func (e CriticalError) Error() string {
	return fmt.Sprintf("%s (block: %s)", e.err.Error(), e.id.String())
}

func criticalError(err error, brokenBlock ulid.ULID) CriticalError {
	return CriticalError{err: err, id: brokenBlock}
}

// IsCriticalError returns true if the base error is a CriticalError.
func IsCriticalError(err error) (bool, CriticalError) {
	var criticalErr CriticalError
	ok := errors.As(err, &criticalErr)
	return ok, criticalErr
}

// repairIssue347 repairs the https://github.com/prometheus/tsdb/issues/347 issue when having issue347Error.
func repairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, blocksMarkedForDeletion prometheus.Counter, ie issue347Error) error {
	level.Info(logger).Log("msg", "Repairing block broken by https://github.com/prometheus/tsdb/issues/347", "id", ie.id, "err", ie)

	tmpdir, err := os.MkdirTemp("", fmt.Sprintf("repair-issue-347-id-%s-", ie.id))
	if err != nil {
		return err
	}

	defer func() {
		if err := os.RemoveAll(tmpdir); err != nil {
			level.Warn(logger).Log("msg", "failed to remote tmpdir", "err", err, "tmpdir", tmpdir)
		}
	}()

	bdir := filepath.Join(tmpdir, ie.id.String())
	if err := block.Download(ctx, logger, bkt, ie.id, bdir); err != nil {
		return fmt.Errorf("download block %s: %w", ie.id, err)
	}

	meta, err := block.ReadMetaFromDir(bdir)
	if err != nil {
		return fmt.Errorf("read meta from %s: %w", bdir, err)
	}

	resid, err := block.Repair(ctx, logger, tmpdir, ie.id, block.CompactorRepairSource, false, block.IgnoreIssue347OutsideChunk)
	if err != nil {
		return fmt.Errorf("repair failed for block %s: %w", ie.id, err)
	}

	// Verify repaired id before uploading it.
	if err := block.VerifyBlock(ctx, logger, filepath.Join(tmpdir, resid.String()), meta.MinTime, meta.MaxTime, false); err != nil {
		return fmt.Errorf("repaired block is invalid %s: %w", resid, err)
	}

	level.Info(logger).Log("msg", "uploading repaired block", "newID", resid)
	if _, err = block.Upload(ctx, logger, bkt, filepath.Join(tmpdir, resid.String()), nil); err != nil {
		return fmt.Errorf("upload of %s failed: %w", resid, err)
	}

	level.Info(logger).Log("msg", "deleting broken block", "id", ie.id)

	// Spawn a new context so we always mark a block for deletion in full on shutdown.
	delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
	defer cancel()

	// TODO(bplotka): Issue with this will introduce overlap that will halt compactor. Automate that (fix duplicate overlaps caused by this).
	if err := block.MarkForDeletion(delCtx, logger, bkt, ie.id, "source of repaired block", blocksMarkedForDeletion); err != nil {
		return fmt.Errorf("marking old block %s for deletion has failed: %w", ie.id, err)
	}
	return nil
}

func deleteBlock(bkt objstore.Bucket, id ulid.ULID, bdir string, logger log.Logger, blocksMarkedForDeletion prometheus.Counter) error {
	if err := os.RemoveAll(bdir); err != nil {
		return fmt.Errorf("remove old block dir %s: %w", id, err)
	}

	// Spawn a new context so we always mark a block for deletion in full on shutdown.
	delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
	defer cancel()
	level.Info(logger).Log("msg", "marking compacted block for deletion", "old_block", id)
	if err := block.MarkForDeletion(delCtx, logger, bkt, id, "source of compacted block", blocksMarkedForDeletion); err != nil {
		return fmt.Errorf("mark block %s for deletion from bucket: %w", id, err)
	}
	return nil
}

// BucketCompactorMetrics holds the metrics tracked by BucketCompactor.
type BucketCompactorMetrics struct {
	groupCompactionRunsStarted               prometheus.Counter
	groupCompactionRunsCompleted             prometheus.Counter
	groupCompactionRunsFailed                prometheus.Counter
	groupCompactions                         prometheus.Counter
	blockCompactionDelay                     *prometheus.HistogramVec
	compactionBlocksVerificationFailed       prometheus.Counter
	compactionBlocksBuildSparseHeadersFailed prometheus.Counter
	blocksMarkedForDeletion                  prometheus.Counter
	blocksMarkedForNoCompact                 *prometheus.CounterVec
	blocksMaxTimeDelta                       prometheus.Histogram
	compactionJobDuration                    *prometheus.HistogramVec
	compactionJobBlocks                      *prometheus.HistogramVec
	blockUploadsStarted                      prometheus.Counter
	blockUploadsFailed                       *prometheus.CounterVec
	blockUploadsDuration                     *prometheus.HistogramVec
}

// NewBucketCompactorMetrics makes a new BucketCompactorMetrics.
func NewBucketCompactorMetrics(blocksMarkedForDeletion prometheus.Counter, reg prometheus.Registerer) *BucketCompactorMetrics {
	bcm := &BucketCompactorMetrics{
		groupCompactionRunsStarted: promauto.With(reg).NewCounter(prometheus.CounterOpts{
			Name: "cortex_compactor_group_compaction_runs_started_total",
			Help: "Total number of group compaction attempts.",
		}),
		groupCompactionRunsCompleted: promauto.With(reg).NewCounter(prometheus.CounterOpts{
			Name: "cortex_compactor_group_compaction_runs_completed_total",
			Help: "Total number of group completed compaction runs. This also includes compactor group runs that resulted with no compaction.",
		}),
		groupCompactionRunsFailed: promauto.With(reg).NewCounter(prometheus.CounterOpts{
			Name: "cortex_compactor_group_compactions_failures_total",
			Help: "Total number of failed group compactions.",
		}),
		groupCompactions: promauto.With(reg).NewCounter(prometheus.CounterOpts{
			Name: "cortex_compactor_group_compactions_total",
			Help: "Total number of group compaction attempts that resulted in new block(s).",
		}),
		blockCompactionDelay: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
			Name:                            "cortex_compactor_block_compaction_delay_seconds",
			Help:                            "Delay between a block being uploaded and successfully compacting it.",
			Buckets:                         []float64{60.0, 300.0, 600.0, 1800.0, 3600.0, 7200.0, 10800.0, 14400.0, 18000.0, 36000.0, 72000.0},
			NativeHistogramBucketFactor:     1.1,
			NativeHistogramMaxBucketNumber:  100,
			NativeHistogramMinResetDuration: 1 * time.Hour,
		}, []string{"level"}),
		compactionJobDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
			Name:                            "cortex_compactor_compaction_job_duration_seconds",
			Help:                            "Duration of successful compaction job.",
			Buckets:                         []float64{1 * 60, 5 * 60, 10 * 60, 15 * 60, 20 * 60, 30 * 60, 45 * 60, 60 * 60, 90 * 60},
			NativeHistogramBucketFactor:     1.1,
			NativeHistogramMaxBucketNumber:  100,
			NativeHistogramMinResetDuration: 1 * time.Hour,
		}, []string{"type"}),
		compactionJobBlocks: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
			Name:                            "cortex_compactor_compaction_job_blocks",
			Help:                            "Number of compacted blocks in successful compaction jobs.",
			Buckets:                         []float64{4, 8, 16, 24, 32, 40, 48, 56, 64, 96},
			NativeHistogramBucketFactor:     1.1,
			NativeHistogramMaxBucketNumber:  100,
			NativeHistogramMinResetDuration: 1 * time.Hour,
		}, []string{"type"}),
		compactionBlocksVerificationFailed: promauto.With(reg).NewCounter(prometheus.CounterOpts{
			Name: "cortex_compactor_blocks_verification_failures_total",
			Help: "Total number of failures when verifying min/max time ranges of compacted blocks.",
		}),
		compactionBlocksBuildSparseHeadersFailed: promauto.With(reg).NewCounter(prometheus.CounterOpts{
			Name: "cortex_compactor_build_sparse_headers_failures_total",
			Help: "Total number of failures when building sparse index headers.",
		}),
		blocksMarkedForDeletion: blocksMarkedForDeletion,
		blocksMarkedForNoCompact: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
			Name: "cortex_compactor_blocks_marked_for_no_compaction_total",
			Help: "Total number of blocks that were marked for no-compaction.",
		}, []string{"reason"}),
		blocksMaxTimeDelta: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
			Name:    "cortex_compactor_block_max_time_delta_seconds",
			Help:    "Difference between now and the max time of a block being compacted in seconds.",
			Buckets: prometheus.LinearBuckets(86400, 43200, 8), // 1 to 5 days, in 12 hour intervals
		}),
		blockUploadsStarted: promauto.With(reg).NewCounter(prometheus.CounterOpts{
			Name: "cortex_compactor_block_uploads_started_total",
			Help: "Total number of block uploads started.",
		}),
		blockUploadsFailed: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
			Name: "cortex_compactor_block_uploads_failed_total",
			Help: "Total number of block uploads failed.",
		}, []string{"type"}),
		blockUploadsDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
			Name:                            "cortex_compactor_block_upload_duration_seconds",
			Help:                            "Duration of successful block uploads.",
			Buckets:                         prometheus.ExponentialBuckets(0.004, 2, 14), // 4ms to 32768ms
			NativeHistogramBucketFactor:     1.1,
			NativeHistogramMaxBucketNumber:  100,
			NativeHistogramMinResetDuration: 1 * time.Hour,
		}, []string{"type"}),
	}
	bcm.blocksMarkedForNoCompact.WithLabelValues(block.OutOfOrderChunksNoCompactReason).Add(0)
	bcm.blocksMarkedForNoCompact.WithLabelValues(block.CriticalNoCompactReason).Add(0)

	return bcm
}

type ownCompactionJobFunc func(job *Job) (bool, error)

// ownAllJobs is a ownCompactionJobFunc that always return true.
var ownAllJobs = func(*Job) (bool, error) {
	return true, nil
}

// BucketCompactor compacts blocks in a bucket.
type BucketCompactor struct {
	logger                        log.Logger
	sy                            *metaSyncer
	grouper                       Grouper
	comp                          Compactor
	planner                       Planner
	compactDir                    string
	bkt                           objstore.Bucket
	concurrency                   int
	skipUnhealthyBlocks           bool
	uploadSparseIndexHeaders      bool
	sparseIndexHeaderSamplingRate int
	maxPerBlockUploadConcurrency  int
	sparseIndexHeaderconfig       indexheader.Config
	ownJob                        ownCompactionJobFunc
	sortJobs                      JobsOrderFunc
	waitPeriod                    time.Duration
	skipFutureMaxTime             bool
	blockSyncConcurrency          int
	metrics                       *BucketCompactorMetrics
}

// NewBucketCompactor creates a new bucket compactor.
func NewBucketCompactor(
	logger log.Logger,
	sy *metaSyncer,
	grouper Grouper,
	planner Planner,
	comp Compactor,
	compactDir string,
	bkt objstore.Bucket,
	concurrency int,
	skipUnhealthyBlocks bool,
	ownJob ownCompactionJobFunc,
	sortJobs JobsOrderFunc,
	waitPeriod time.Duration,
	skipFutureMaxTime bool,
	blockSyncConcurrency int,
	metrics *BucketCompactorMetrics,
	uploadSparseIndexHeaders bool,
	sparseIndexHeaderSamplingRate int,
	sparseIndexHeaderconfig indexheader.Config,
	maxPerBlockUploadConcurrency int,
) (*BucketCompactor, error) {
	if concurrency <= 0 {
		return nil, fmt.Errorf("invalid concurrency level (%d), concurrency level must be > 0", concurrency)
	}

	if maxPerBlockUploadConcurrency <= 0 {
		return nil, fmt.Errorf("invalid per block upload concurrency level (%d), concurrency must be > 0", maxPerBlockUploadConcurrency)
	}

	return &BucketCompactor{
		logger:                        logger,
		sy:                            sy,
		grouper:                       grouper,
		planner:                       planner,
		comp:                          comp,
		compactDir:                    compactDir,
		bkt:                           bkt,
		concurrency:                   concurrency,
		skipUnhealthyBlocks:           skipUnhealthyBlocks,
		ownJob:                        ownJob,
		sortJobs:                      sortJobs,
		waitPeriod:                    waitPeriod,
		skipFutureMaxTime:             skipFutureMaxTime,
		blockSyncConcurrency:          blockSyncConcurrency,
		metrics:                       metrics,
		uploadSparseIndexHeaders:      uploadSparseIndexHeaders,
		sparseIndexHeaderSamplingRate: sparseIndexHeaderSamplingRate,
		sparseIndexHeaderconfig:       sparseIndexHeaderconfig,
		maxPerBlockUploadConcurrency:  maxPerBlockUploadConcurrency,
	}, nil
}

// Compact runs compaction over bucket.
// If maxCompactionTime is positive then after this time no more new compactions are started.
func (c *BucketCompactor) Compact(ctx context.Context, maxCompactionTime time.Duration) (rerr error) {
	defer func() {
		// Do not remove the compactDir if an error has occurred
		// because potentially on the next run we would not have to download
		// everything again.
		if rerr != nil {
			return
		}
		if err := os.RemoveAll(c.compactDir); err != nil {
			level.Error(c.logger).Log("msg", "failed to remove compaction work directory", "path", c.compactDir, "err", err)
		}
	}()

	// Keep this channel outside the compaction loop, because we want the "max compaction time"
	// to apply on compactions across multiple consecutive loops. This channel is initialised
	// after the first planning.
	var maxCompactionTimeChan <-chan time.Time

	// Loop over bucket and compact until there's no work left.
	for {
		var (
			wg                     sync.WaitGroup
			workCtx, workCtxCancel = context.WithCancelCause(ctx)
			jobChan                = make(chan *Job)
			errChan                = make(chan error, c.concurrency)
			finishedAllJobs        = true
			mtx                    sync.Mutex
		)

		defer workCtxCancel(errCompactionIterationCancelled)

		// Set up workers which will compact the jobs when the jobs are ready.
		// They will compact available jobs until they encounter an error, after which they will stop.
		for i := 0; i < c.concurrency; i++ {
			wg.Add(1)
			go func() {
				defer wg.Done()
				for g := range jobChan {
					// Ensure the job is still owned by the current compactor instance.
					// If not, we shouldn't run it because another compactor instance may already
					// process it (or will do it soon).
					if ok, err := c.ownJob(g); err != nil {
						level.Info(c.logger).Log("msg", "skipped compaction because unable to check whether the job is owned by the compactor instance", "groupKey", g.Key(), "err", err)
						continue
					} else if !ok {
						level.Info(c.logger).Log("msg", "skipped compaction because job is not owned by the compactor instance anymore", "groupKey", g.Key())
						continue
					}

					c.metrics.groupCompactionRunsStarted.Inc()

					shouldRerunJob, compactedBlockIDs, err := c.runCompactionJob(workCtx, g)
					if err == nil {
						c.metrics.groupCompactionRunsCompleted.Inc()
						if hasNonZeroULIDs(compactedBlockIDs) {
							c.metrics.groupCompactions.Inc()
						}

						if shouldRerunJob {
							mtx.Lock()
							finishedAllJobs = false
							mtx.Unlock()
						}
						continue
					}

					// At this point the compaction has failed.
					c.metrics.groupCompactionRunsFailed.Inc()

					if ok, issue347Err := isIssue347Error(err); ok {
						if err := repairIssue347(workCtx, c.logger, c.bkt, c.sy.metrics.blocksMarkedForDeletion, issue347Err); err == nil {
							mtx.Lock()
							finishedAllJobs = false
							mtx.Unlock()
							continue
						}
					}
					// If the block has an out of order chunk and we have been configured to skip it,
					// then we can mark the block for no compaction so that the next compaction run
					// will skip it.
					if ok, outOfOrderChunksErr := IsOutOfOrderChunkError(err); ok && c.skipUnhealthyBlocks {
						err := block.MarkForNoCompact(
							ctx,
							c.logger,
							c.bkt,
							outOfOrderChunksErr.id,
							block.OutOfOrderChunksNoCompactReason,
							"OutofOrderChunk: marking block with out-of-order series/chunks as no compact to unblock compaction",
							c.metrics.blocksMarkedForNoCompact.WithLabelValues(block.OutOfOrderChunksNoCompactReason),
						)
						if err == nil {
							mtx.Lock()
							finishedAllJobs = false
							mtx.Unlock()
							continue
						}
					}

					// In case an unhealthy block is found, we mark it for no compaction
					// to unblock future compaction run.
					if ok, criticalErr := IsCriticalError(err); ok && c.skipUnhealthyBlocks {
						err := block.MarkForNoCompact(
							ctx,
							c.logger,
							c.bkt,
							criticalErr.id,
							block.CriticalNoCompactReason,
							"UnhealthyBlock: marking unhealthy block as no compact to unblock compaction",
							c.metrics.blocksMarkedForNoCompact.WithLabelValues(block.CriticalNoCompactReason),
						)
						if err == nil {
							mtx.Lock()
							finishedAllJobs = false
							mtx.Unlock()
							continue
						}
					}

					errChan <- fmt.Errorf("group %s: %w", g.Key(), err)
					return
				}
			}()
		}

		level.Info(c.logger).Log("msg", "start sync of metas")
		if err := c.sy.SyncMetas(ctx); err != nil {
			return fmt.Errorf("sync: %w", err)
		}

		level.Info(c.logger).Log("msg", "start of GC")
		// Blocks that were compacted are garbage collected after each Compaction.
		// However if compactor crashes we need to resolve those on startup.
		if err := c.sy.GarbageCollect(ctx); err != nil {
			return fmt.Errorf("blocks garbage collect: %w", err)
		}

		jobs, err := c.grouper.Groups(c.sy.Metas())
		if err != nil {
			return fmt.Errorf("build compaction jobs: %w", err)
		}

		// There is another check just before we start processing the job, but we can avoid sending it
		// to the goroutine in the first place.
		jobs, err = c.filterOwnJobs(jobs)
		if err != nil {
			return err
		}

		// Record the difference between now and the max time for a block being compacted. This
		// is used to detect compactors not being able to keep up with the rate of blocks being
		// created. The idea is that most blocks should be for within 24h or 48h.
		now := time.Now()
		for _, delta := range c.blockMaxTimeDeltas(now, jobs) {
			c.metrics.blocksMaxTimeDelta.Observe(delta)
		}

		// Skip jobs for which the wait period hasn't been honored yet.
		jobs = c.filterJobsByWaitPeriod(ctx, jobs)

		// Sort jobs based on the configured ordering algorithm.
		jobs = c.sortJobs(jobs)

		ignoreDirs := []string{}
		for _, gr := range jobs {
			for _, grID := range gr.IDs() {
				ignoreDirs = append(ignoreDirs, filepath.Join(gr.Key(), grID.String()))
			}
		}

		if err := runutil.DeleteAll(c.compactDir, ignoreDirs...); err != nil {
			level.Warn(c.logger).Log("msg", "failed deleting non-compaction job directories/files, some disk space usage might have leaked. Continuing", "err", err, "dir", c.compactDir)
		}

		level.Info(c.logger).Log("msg", "start of compactions")

		// Start the max compaction timer only after the first planning has been completed. This is important
		// in case the metas syncing (or planning in general) is slow. If did start the timer before the first
		// planning we could end up in a situation where the planning takes longer than "max compaction time"
		// and we would never run compactions at all for a given tenant.
		if maxCompactionTimeChan == nil && maxCompactionTime > 0 {
			maxCompactionTimeChan = time.After(maxCompactionTime)
		}

		maxCompactionTimeReached := false
		// Send all jobs found during this pass to the compaction workers.
		var jobErrs multierror.MultiError
	jobLoop:
		for _, g := range jobs {
			select {
			case jobErr := <-errChan:
				jobErrs.Add(jobErr)
				break jobLoop
			case jobChan <- g:
			case <-maxCompactionTimeChan:
				maxCompactionTimeReached = true
				level.Info(c.logger).Log("msg", "max compaction time reached, no more compactions will be started")
				break jobLoop
			}
		}
		close(jobChan)
		wg.Wait()

		// Collect any other error reported by the workers, or any error reported
		// while we were waiting for the last batch of jobs to run the compaction.
		close(errChan)
		for jobErr := range errChan {
			jobErrs.Add(jobErr)
		}

		workCtxCancel(errCompactionIterationStopped)
		if len(jobErrs) > 0 {
			return jobErrs.Err()
		}

		if maxCompactionTimeReached || finishedAllJobs {
			break
		}
	}
	level.Info(c.logger).Log("msg", "compaction iterations done")
	return nil
}

// blockMaxTimeDeltas returns a slice of the difference between now and the MaxTime of each
// block that will be compacted as part of the provided jobs, in seconds.
func (c *BucketCompactor) blockMaxTimeDeltas(now time.Time, jobs []*Job) []float64 {
	var out []float64

	for _, j := range jobs {
		for _, m := range j.Metas() {
			out = append(out, now.Sub(time.UnixMilli(m.MaxTime)).Seconds())
		}
	}

	return out
}

func (c *BucketCompactor) filterOwnJobs(jobs []*Job) ([]*Job, error) {
	for ix := 0; ix < len(jobs); {
		// Skip any job which doesn't belong to this compactor instance.
		if ok, err := c.ownJob(jobs[ix]); err != nil {
			return nil, fmt.Errorf("ownJob: %w", err)
		} else if !ok {
			jobs = append(jobs[:ix], jobs[ix+1:]...)
		} else {
			ix++
		}
	}
	return jobs, nil
}

// filterJobsByWaitPeriod filters out jobs for which the configured wait period hasn't been honored yet.
func (c *BucketCompactor) filterJobsByWaitPeriod(ctx context.Context, jobs []*Job) []*Job {
	for i := 0; i < len(jobs); {
		if elapsed, notElapsedBlock, err := jobWaitPeriodElapsed(ctx, jobs[i], c.waitPeriod, c.skipFutureMaxTime, c.bkt); err != nil {
			level.Warn(c.logger).Log("msg", "not enforcing compaction wait period because the check if compaction job contains recently uploaded blocks has failed", "groupKey", jobs[i].Key(), "err", err)

			// Keep the job.
			i++
		} else if !elapsed {
			level.Info(c.logger).Log("msg", "skipping compaction job because blocks in this job were uploaded too recently (within wait period)", "groupKey", jobs[i].Key(), "waitPeriodNotElapsedFor", notElapsedBlock.String())
			jobs = append(jobs[:i], jobs[i+1:]...)
		} else {
			i++
		}
	}

	return jobs
}

var _ block.MetadataFilter = &NoCompactionMarkFilter{}

// NoCompactionMarkFilter is a block.Fetcher filter that finds all blocks with no-compact marker files, and optionally
// removes them from synced metas.
type NoCompactionMarkFilter struct {
	bkt                objstore.InstrumentedBucketReader
	noCompactMarkedMap map[ulid.ULID]struct{}
}

// NewNoCompactionMarkFilter creates NoCompactionMarkFilter.
func NewNoCompactionMarkFilter(bkt objstore.InstrumentedBucketReader) *NoCompactionMarkFilter {
	return &NoCompactionMarkFilter{
		bkt: bkt,
	}
}

// NoCompactMarkedBlocks returns block ids that were marked for no compaction.
// It is safe to call this method only after Filter has finished, and it is also safe to manipulate the map between calls to Filter.
func (f *NoCompactionMarkFilter) NoCompactMarkedBlocks() map[ulid.ULID]struct{} {
	return f.noCompactMarkedMap
}

// Filter finds blocks that should not be compacted, and fills f.noCompactMarkedMap. If f.removeNoCompactBlocks is true,
// blocks are also removed from metas. (Thanos version of the filter doesn't do removal).
func (f *NoCompactionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*block.Meta, synced block.GaugeVec) error {
	noCompactMarkedMap := make(map[ulid.ULID]struct{})

	// Find all no-compact markers in the storage.
	err := f.bkt.Iter(ctx, block.MarkersPathname+"/", func(name string) error {
		if err := ctx.Err(); err != nil {
			return err
		}

		if blockID, ok := block.IsNoCompactMarkFilename(path.Base(name)); ok {
			_, exists := metas[blockID]
			if exists {
				noCompactMarkedMap[blockID] = struct{}{}
				synced.WithLabelValues(block.MarkedForNoCompactionMeta).Inc()

				delete(metas, blockID)
			}

		}
		return nil
	})
	if err != nil {
		return fmt.Errorf("list block no-compact marks: %w", err)
	}

	f.noCompactMarkedMap = noCompactMarkedMap
	return nil
}

func hasNonZeroULIDs(ids []ulid.ULID) bool {
	for _, id := range ids {
		if id != (ulid.ULID{}) {
			return true
		}
	}

	return false
}
